/*
 * Copyright (c) 2024, Alibaba Cloud;
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.aliyun.dataworks.migrationx.transformer.core.spark.command;

import java.io.File;
import java.io.IOException;
import java.util.*;

import static com.aliyun.dataworks.migrationx.transformer.core.spark.command.CommandBuilderUtils.*;

/**
 * Special command builder for handling a CLI invocation of SparkSubmit.
 * <p>
 * This builder adds command line parsing compatible with SparkSubmit. It handles setting driver-side options and
 * special parsing behavior needed for the special-casing certain internal Spark applications.
 * <p>
 * This class has also some special features to aid launching shells (pyspark and sparkR) and also examples.
 */
public class SparkSubmitCommandBuilder extends AbstractCommandBuilder {

    /**
     * Name of the app resource used to identify the PySpark shell. The command line parser expects the resource name to
     * be the very first argument to spark-submit in this case.
     * <p>
     * NOTE: this cannot be "pyspark-shell" since that identifies the PySpark shell to SparkSubmit (see
     * java_gateway.py), and can cause this code to enter into an infinite loop.
     */
    static final String PYSPARK_SHELL = "pyspark-shell-main";

    /**
     * This is the actual resource name that identifies the PySpark shell to SparkSubmit.
     */
    static final String PYSPARK_SHELL_RESOURCE = "pyspark-shell";

    /**
     * Name of the app resource used to identify the SparkR shell. The command line parser expects the resource name to
     * be the very first argument to spark-submit in this case.
     * <p>
     * NOTE: this cannot be "sparkr-shell" since that identifies the SparkR shell to SparkSubmit (see sparkR.R), and can
     * cause this code to enter into an infinite loop.
     */
    static final String SPARKR_SHELL = "sparkr-shell-main";

    /**
     * This is the actual resource name that identifies the SparkR shell to SparkSubmit.
     */
    static final String SPARKR_SHELL_RESOURCE = "sparkr-shell";

    /**
     * Name of app resource used to identify examples. When running examples, args[0] should be this name. The app
     * resource will identify the example class to run.
     */
    static final String RUN_EXAMPLE = "run-example";

    /**
     * Prefix for example class names.
     */
    static final String EXAMPLE_CLASS_PREFIX = "org.apache.spark.examples.";

    /**
     * This map must match the class names for available special classes, since this modifies the way command line
     * parsing works. This maps the class name to the resource to use when calling spark-submit.
     */
    private static final Map<String, String> specialClasses = new HashMap<>();

    static {
        specialClasses.put("org.apache.spark.repl.Main", "spark-shell");
        specialClasses.put("org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver",
            SparkLauncher.NO_RESOURCE);
        specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
            SparkLauncher.NO_RESOURCE);
    }

    final List<String> userArgs;
    private final List<String> parsedArgs;
    // Special command means no appResource and no mainClass required
    private final boolean isSpecialCommand;
    private final boolean isExample;

    /**
     * Controls whether mixing spark-submit arguments with app arguments is allowed. This is needed to parse the command
     * lines for things like bin/spark-shell, which allows users to mix and match arguments (e.g. "bin/spark-shell
     * SparkShellArg --master foo").
     */
    private boolean allowsMixedArguments;

    /**
     * This constructor is used when creating a user-configurable launcher. It allows the spark-submit argument list to
     * be modified after creation.
     */
    public SparkSubmitCommandBuilder() {
        this.isSpecialCommand = false;
        this.isExample = false;
        this.parsedArgs = new ArrayList<>();
        this.userArgs = new ArrayList<>();
    }

    /**
     * This constructor is used when invoking spark-submit; it parses and validates arguments provided by the user on
     * the command line.
     */
    public SparkSubmitCommandBuilder(List<String> args) {
        this.allowsMixedArguments = false;
        this.parsedArgs = new ArrayList<>();
        boolean isExample = false;
        List<String> submitArgs = args;
        this.userArgs = Collections.emptyList();

        if (args.size() > 0) {
            switch (args.get(0)) {
                case PYSPARK_SHELL:
                    this.allowsMixedArguments = true;
                    appResource = PYSPARK_SHELL;
                    submitArgs = args.subList(1, args.size());
                    break;

                case SPARKR_SHELL:
                    this.allowsMixedArguments = true;
                    appResource = SPARKR_SHELL;
                    submitArgs = args.subList(1, args.size());
                    break;

                case RUN_EXAMPLE:
                    isExample = true;
                    appResource = SparkLauncher.NO_RESOURCE;
                    submitArgs = args.subList(1, args.size());
            }

            this.isExample = isExample;
            OptionParser parser = new OptionParser(true);
            parser.parse(submitArgs);
            this.isSpecialCommand = parser.isSpecialCommand;
        } else {
            this.isExample = isExample;
            this.isSpecialCommand = true;
        }
    }

    @Override
    public List<String> buildCommand(Map<String, String> env)
        throws IOException, IllegalArgumentException {
        if (PYSPARK_SHELL.equals(appResource) && !isSpecialCommand) {
            return buildPySparkShellCommand(env);
        } else if (SPARKR_SHELL.equals(appResource) && !isSpecialCommand) {
            return buildSparkRCommand(env);
        } else {
            return buildSparkSubmitCommand(env);
        }
    }

    List<String> buildSparkSubmitArgs() {
        List<String> args = new ArrayList<>();
        OptionParser parser = new OptionParser(false);
        final boolean isSpecialCommand;

        // If the user args array is not empty, we need to parse it to detect exactly what
        // the user is trying to run, so that checks below are correct.
        if (!userArgs.isEmpty()) {
            parser.parse(userArgs);
            isSpecialCommand = parser.isSpecialCommand;
        } else {
            isSpecialCommand = this.isSpecialCommand;
        }

        if (!allowsMixedArguments && !isSpecialCommand) {
            checkArgument(appResource != null, "Missing application resource.");
        }

        if (verbose) {
            args.add(parser.VERBOSE);
        }

        if (master != null) {
            args.add(parser.MASTER);
            args.add(master);
        }

        if (deployMode != null) {
            args.add(parser.DEPLOY_MODE);
            args.add(deployMode);
        }

        if (appName != null) {
            args.add(parser.NAME);
            args.add(appName);
        }

        for (Map.Entry<String, String> e : conf.entrySet()) {
            args.add(parser.CONF);
            args.add(String.format("%s=%s", e.getKey(), e.getValue()));
        }

        if (propertiesFile != null) {
            args.add(parser.PROPERTIES_FILE);
            args.add(propertiesFile);
        }

        if (isExample) {
            jars.addAll(findExamplesJars());
        }

        if (!jars.isEmpty()) {
            args.add(parser.JARS);
            args.add(join(",", jars));
        }

        if (!files.isEmpty()) {
            args.add(parser.FILES);
            args.add(join(",", files));
        }

        if (!pyFiles.isEmpty()) {
            args.add(parser.PY_FILES);
            args.add(join(",", pyFiles));
        }

        if (isExample && !isSpecialCommand) {
            checkArgument(mainClass != null, "Missing example class name.");
        }

        if (mainClass != null) {
            args.add(parser.CLASS);
            args.add(mainClass);
        }

        args.addAll(parsedArgs);
        if (appResource != null) {
            args.add(appResource);
        }
        args.addAll(appArgs);

        return args;
    }

    private List<String> buildSparkSubmitCommand(Map<String, String> env)
        throws IOException, IllegalArgumentException {
        // Load the properties file and check whether spark-submit will be running the app's driver
        // or just launching a cluster app. When running the driver, the JVM's argument will be
        // modified to cover the driver's configuration.
        Map<String, String> config = getEffectiveConfig();
        boolean isClientMode = isClientMode(config);
        String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;

        List<String> cmd = buildJavaCommand(extraClassPath);
        // Take Thrift Server as daemon
        if (isThriftServer(mainClass)) {
            addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
        }
        addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));

        // We don't want the client to specify Xmx. These have to be set by their corresponding
        // memory flag --driver-memory or configuration entry spark.driver.memory
        String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
        if (!isEmpty(driverExtraJavaOptions) && driverExtraJavaOptions.contains("Xmx")) {
            String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " +
                "java options (was %s). Use the corresponding --driver-memory or " +
                "spark.driver.memory configuration instead.", driverExtraJavaOptions);
            throw new IllegalArgumentException(msg);
        }

        if (isClientMode) {
            // Figuring out where the memory value come from is a little tricky due to precedence.
            // Precedence is observed in the following order:
            // - explicit configuration (setConf()), which also covers --driver-memory cli argument.
            // - properties file.
            // - SPARK_DRIVER_MEMORY env variable
            // - SPARK_MEM env variable
            // - default value (1g)
            // Take Thrift Server as daemon
            String tsMemory =
                isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
            String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
                System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
            cmd.add("-Xmx" + memory);
            addOptionString(cmd, driverExtraJavaOptions);
            mergeEnvPathList(env, getLibPathEnvName(),
                config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
        }

        cmd.add("org.apache.spark.deploy.SparkSubmit");
        cmd.addAll(buildSparkSubmitArgs());
        return cmd;
    }

    private List<String> buildPySparkShellCommand(Map<String, String> env) throws IOException {
        // For backwards compatibility, if a script is specified in
        // the pyspark command line, then run it using spark-submit.
        if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".py")) {
            System.err.println(
                "Running python applications through 'pyspark' is not supported as of Spark 2.0.\n" +
                    "Use ./bin/spark-submit <python file>");
            System.exit(-1);
        }

        checkArgument(appArgs.isEmpty(), "pyspark does not support any application options.");

        // When launching the pyspark shell, the spark-submit arguments should be stored in the
        // PYSPARK_SUBMIT_ARGS env variable.
        appResource = PYSPARK_SHELL_RESOURCE;
        constructEnvVarArgs(env, "PYSPARK_SUBMIT_ARGS");

        // Will pick up the binary executable in the following order
        // 1. conf spark.pyspark.driver.python
        // 2. conf spark.pyspark.python
        // 3. environment variable PYSPARK_DRIVER_PYTHON
        // 4. environment variable PYSPARK_PYTHON
        // 5. python
        List<String> pyargs = new ArrayList<>();
        pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
            conf.get(SparkLauncher.PYSPARK_PYTHON),
            System.getenv("PYSPARK_DRIVER_PYTHON"),
            System.getenv("PYSPARK_PYTHON"),
            "python"));
        String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
        if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
            // pass conf spark.pyspark.python to python by environment variable.
            env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
        }
        if (!isEmpty(pyOpts)) {
            pyargs.addAll(parseOptionString(pyOpts));
        }

        return pyargs;
    }

    private List<String> buildSparkRCommand(Map<String, String> env) throws IOException {
        if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".R")) {
            System.err.println(
                "Running R applications through 'sparkR' is not supported as of Spark 2.0.\n" +
                    "Use ./bin/spark-submit <R file>");
            System.exit(-1);
        }
        // When launching the SparkR shell, store the spark-submit arguments in the SPARKR_SUBMIT_ARGS
        // env variable.
        appResource = SPARKR_SHELL_RESOURCE;
        constructEnvVarArgs(env, "SPARKR_SUBMIT_ARGS");

        // Set shell.R as R_PROFILE_USER to load the SparkR package when the shell comes up.
        String sparkHome = System.getenv("SPARK_HOME");
        env.put("R_PROFILE_USER",
            join(File.separator, sparkHome, "R", "lib", "SparkR", "profile", "shell.R"));

        List<String> args = new ArrayList<>();
        args.add(firstNonEmpty(conf.get(SparkLauncher.SPARKR_R_SHELL),
            System.getenv("SPARKR_DRIVER_R"), "R"));
        return args;
    }

    private void constructEnvVarArgs(
        Map<String, String> env,
        String submitArgsEnvVariable) throws IOException {
        mergeEnvPathList(env, getLibPathEnvName(),
            getEffectiveConfig().get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));

        StringBuilder submitArgs = new StringBuilder();
        for (String arg : buildSparkSubmitArgs()) {
            if (submitArgs.length() > 0) {
                submitArgs.append(" ");
            }
            submitArgs.append(quoteForCommandString(arg));
        }
        env.put(submitArgsEnvVariable, submitArgs.toString());
    }

    boolean isClientMode(Map<String, String> userProps) {
        String userMaster = firstNonEmpty(master, userProps.get(SparkLauncher.SPARK_MASTER));
        String userDeployMode = firstNonEmpty(deployMode, userProps.get(SparkLauncher.DEPLOY_MODE));
        // Default master is "local[*]", so assume client mode in that case
        return userMaster == null ||
            "client".equals(userDeployMode) ||
            (!userMaster.equals("yarn-cluster") && userDeployMode == null);
    }

    /**
     * Return whether the given main class represents a thrift server.
     */
    private boolean isThriftServer(String mainClass) {
        return (mainClass != null &&
            mainClass.equals("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"));
    }

    private List<String> findExamplesJars() {
        boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
        List<String> examplesJars = new ArrayList<>();
        String sparkHome = getSparkHome();

        File jarsDir;
        if (new File(sparkHome, "RELEASE").isFile()) {
            jarsDir = new File(sparkHome, "examples/jars");
        } else {
            jarsDir = new File(sparkHome,
                String.format("examples/target/scala-%s/jars", getScalaVersion()));
        }

        boolean foundDir = jarsDir.isDirectory();
        checkState(isTesting || foundDir, "Examples jars directory '%s' does not exist.",
            jarsDir.getAbsolutePath());

        if (foundDir) {
            for (File f : jarsDir.listFiles()) {
                examplesJars.add(f.getAbsolutePath());
            }
        }
        return examplesJars;
    }

    public class OptionParser extends SparkSubmitOptionParser {

        boolean isSpecialCommand = false;
        private final boolean errorOnUnknownArgs;

        OptionParser(boolean errorOnUnknownArgs) {
            this.errorOnUnknownArgs = errorOnUnknownArgs;
        }

        @Override
        protected boolean handle(String opt, String value) {
            switch (opt) {
                case FILES:
                    files.addAll(Arrays.asList(value.split(",")));
                    break;
                case PY_FILES:
                    pyFiles.addAll(Arrays.asList(value.split(",")));
                    break;
                case EXECUTOR_CORES:
                    conf.put(SparkLauncher.EXECUTOR_CORES, value);
                    break;
                case EXECUTOR_MEMORY:
                    conf.put(SparkLauncher.EXECUTOR_MEMORY, value);
                    break;
                case DRIVER_CORES:
                    conf.put(SparkLauncher.DRIVER_CORES, value);
                    break;
                case NUM_EXECUTORS:
                    numExecutors = Integer.valueOf(value);
                    conf.put(SparkLauncher.EXECUTOR_INSTANCES, value);
                    break;
                case QUEUE:
                    queue = value;
                    break;
                case NAME:
                    appName = value;
                    break;
                case MASTER:
                    master = value;
                    break;
                case DEPLOY_MODE:
                    deployMode = value;
                    break;
                case PROPERTIES_FILE:
                    propertiesFile = value;
                    break;
                case DRIVER_MEMORY:
                    conf.put(SparkLauncher.DRIVER_MEMORY, value);
                    break;
                case DRIVER_JAVA_OPTIONS:
                    conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
                    break;
                case DRIVER_LIBRARY_PATH:
                    conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
                    break;
                case DRIVER_CLASS_PATH:
                    conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
                    break;
                case CONF:
                    checkArgument(value != null, "Missing argument to %s", CONF);
                    String[] setConf = value.split("=", 2);
                    checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
                    conf.put(setConf[0], setConf[1]);
                    break;
                case CLASS:
                    // The special classes require some special command line handling, since they allow
                    // mixing spark-submit arguments with arguments that should be propagated to the shell
                    // itself. Note that for this to work, the "--class" argument must come before any
                    // non-spark-submit arguments.
                    mainClass = value;
                    if (specialClasses.containsKey(value)) {
                        allowsMixedArguments = true;
                        appResource = specialClasses.get(value);
                    }
                    break;
                case KILL_SUBMISSION:
                case STATUS:
                    isSpecialCommand = true;
                    parsedArgs.add(opt);
                    parsedArgs.add(value);
                    break;
                case HELP:
                case USAGE_ERROR:
                case VERSION:
                    isSpecialCommand = true;
                    parsedArgs.add(opt);
                    break;
                default:
                    parsedArgs.add(opt);
                    if (value != null) {
                        parsedArgs.add(value);
                    }
                    break;
            }
            return true;
        }

        @Override
        protected boolean handleUnknown(String opt) {
            // When mixing arguments, add unrecognized parameters directly to the user arguments list. In
            // normal mode, any unrecognized parameter triggers the end of command line parsing, and the
            // parameter itself will be interpreted by SparkSubmit as the application resource. The
            // remaining params will be appended to the list of SparkSubmit arguments.
            if (allowsMixedArguments) {
                appArgs.add(opt);
                return true;
            } else if (isExample) {
                String className = opt;
                if (!className.startsWith(EXAMPLE_CLASS_PREFIX)) {
                    className = EXAMPLE_CLASS_PREFIX + className;
                }
                mainClass = className;
                appResource = SparkLauncher.NO_RESOURCE;
                return false;
            } else if (errorOnUnknownArgs) {
                checkArgument(!opt.startsWith("-"), "Unrecognized option: %s", opt);
                checkState(appResource == null, "Found unrecognized argument but resource is already set.");
                appResource = opt;
                return false;
            }
            return true;
        }

        @Override
        protected void handleExtraArgs(List<String> extra) {
            appArgs.addAll(extra);
        }

    }

}
